#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <netdb.h>
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>


#define MAXLINE 4096
#define DATA_BUFFER_SIZE 4096

/* EVLIST_X_ Private space: 0x1000-0xf000 */
#define EVLIST_ALL	(0xf000 | 0x9f)
#define MEMCACHED_CONN_DISPATCH(arg0, arg1)
#define EV_TIMEOUT	0x01
#define EV_READ		0x02
#define EV_WRITE	0x04
#define EV_SIGNAL	0x08
#define EV_PERSIST	0x10	/* Persistant event */

#ifdef WIN32
#define evutil_socket_t intptr_t
#else
#define evutil_socket_t int
#endif

#define ITEMS_PER_ALLOC 64
/******************************* TYPE DECLARATION ******************************/

struct event_base
{
    int event_no;
    int event_count;
};

enum protocol
{
    ascii_prot = 3, /* arbitrary value. */
    binary_prot,
    negotiating_prot /* Discovering the protocol */
};

enum network_transport
{
    local_transport, /* Unix sockets*/
    tcp_transport,
    udp_transport
};


/**
 * Possible states of a connection.
 */
enum conn_states
{
    conn_listening,  /**< the socket which listens for connections */
    conn_new_cmd     /**< Prepare connection for next command */
};

/* An item in the connection queue. */
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item
{
    int               sfd;
    enum conn_states  init_state;
    int               event_flags;
    int               read_buffer_size;
    enum network_transport     transport;
    CQ_ITEM          *next;
};
/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue
{
    CQ_ITEM *head;
    CQ_ITEM *tail;
    pthread_mutex_t lock;
    pthread_cond_t  cond;
};
/**
 * Stats stored per-thread.
 */
struct thread_stats
{
    pthread_mutex_t   mutex;

};

//worker 线程结构体
typedef struct
{
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */
} LIBEVENT_THREAD;

//主线程结构体
typedef struct
{
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
} LIBEVENT_DISPATCHER_THREAD;

/**
 * Global stats.
 */
struct stats
{
    pthread_mutex_t mutex;
    unsigned int  curr_conns;
    unsigned int  total_conns;
    uint64_t      rejected_conns;
    unsigned int  conn_structs;

};

/* When adding a setting, be sure to update process_stat_settings */
/**
 * Globally accessible settings as derived from the commandline.
 */
struct settings
{
    size_t maxbytes;
    int maxconns;
    int port;
    char *inter;
    int verbose;
    char *socketpath;   /* path to unix socket if using local socket */
    int num_threads;        /* number of worker (without dispatcher) libevent threads to run */
    enum protocol binding_protocol;

};

/** file scope variables **/
/******************************* FILE SCOPE VARIABLES ******************************/
/* Which thread we assigned a connection to most recently. */
static int last_thread = -1;
/* Lock for global stats */
static pthread_mutex_t stats_lock;
/*
 * Number of worker threads that have finished setting themselves up.
 */
static int init_count = 0;
static pthread_mutex_t init_lock;
static pthread_cond_t init_cond;

//static struct stats stats;
static struct settings settings;
static struct event_base *main_base;
static struct event_base *current_base = NULL;

/*
 * Each libevent instance has a wakeup pipe, which other threads
 * can use to signal that they've put a new connection on its queue.
 */
static LIBEVENT_THREAD *threads;
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;

/* Free list of CQ_ITEM structs */
static CQ_ITEM *cqi_freelist;
static pthread_mutex_t cqi_freelist_lock;


/******************************* EVENT_BASE ******************************/

struct event_base *
event_base_new()
{
    struct event_base *base = NULL;
    if ((base = (struct event_base *)calloc(1, sizeof(struct event_base))) == NULL)
    {
        printf("%d: calloc", __LINE__);
        return NULL;
    }
    return base;
}

void
event_base_free(struct event_base *base)
{
    if (base == NULL && current_base)
        base = current_base;
    /* If we're freeing current_base, there won't be a current_base. */
    if (base == current_base)
        current_base = NULL;
    /* Don't actually free NULL. */
    if (base == NULL)
    {
        printf("%d: no base to free", __LINE__);
        return;
    }
    free(base);
}

struct event_base *
event_init(void)
{
    struct event_base *base = event_base_new();

    if (base == NULL)
    {
        printf("%d: Unable to construct event_base", __LINE__);
        return NULL;
    }

    current_base = base;

    return (base);
}

//The related function of conn_queue
/******************************* CONNECTION QUEUE ******************************/

/*
 * Initializes a connection queue.
 */
static void cq_init(CQ *cq)
{
    pthread_mutex_init(&cq->lock, NULL);
    pthread_cond_init(&cq->cond, NULL);
    cq->head = NULL;
    cq->tail = NULL;
}

/*
 * Looks for an item on a connection queue, but doesn't block if there isn't
 * one.
 * Returns the item, or NULL if no item is available
 */
static CQ_ITEM *cq_pop(CQ *cq)
{
    CQ_ITEM *item;

    pthread_mutex_lock(&cq->lock);
    item = cq->head;
    if (NULL != item)
    {
        cq->head = item->next;
        if (NULL == cq->head)
            cq->tail = NULL;
    }
    pthread_mutex_unlock(&cq->lock);

    return item;
}

/*
 * Adds an item to a connection queue.
 */
static void cq_push(CQ *cq, CQ_ITEM *item)
{
    item->next = NULL;

    pthread_mutex_lock(&cq->lock);
    if (NULL == cq->tail)
        cq->head = item;
    else
        cq->tail->next = item;
    cq->tail = item;
    pthread_cond_signal(&cq->cond);
    pthread_mutex_unlock(&cq->lock);
}

/*
 * Returns a fresh connection queue item.
 */
static CQ_ITEM *cqi_new(void)
{
    CQ_ITEM *item = NULL;
    pthread_mutex_lock(&cqi_freelist_lock);
    if (cqi_freelist)
    {
        item = cqi_freelist;
        cqi_freelist = item->next;
    }
    pthread_mutex_unlock(&cqi_freelist_lock);

    if (NULL == item)
    {
        int i;

        /* Allocate a bunch of items at once to reduce fragmentation */
        item = (CQ_ITEM*)malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
        if (NULL == item)
            return NULL;

        /*
         * Link together all the new items except the first one
         * (which we'll return to the caller) for placement on
         * the freelist.
         */
        for (i = 2; i < ITEMS_PER_ALLOC; i++)
            item[i - 1].next = &item[i];

        pthread_mutex_lock(&cqi_freelist_lock);
        item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
        cqi_freelist = &item[1];
        pthread_mutex_unlock(&cqi_freelist_lock);
    }

    return item;
}


/*
 * Frees a connection queue item (adds it to the freelist.)
 */
static void cqi_free(CQ_ITEM *item)
{
    pthread_mutex_lock(&cqi_freelist_lock);
    item->next = cqi_freelist;
    cqi_freelist = item;
    pthread_mutex_unlock(&cqi_freelist_lock);
}

/******************************* GLOBAL STATS ******************************/

void STATS_LOCK()
{
    pthread_mutex_lock(&stats_lock);
}

void STATS_UNLOCK()
{
    pthread_mutex_unlock(&stats_lock);
}

/******************************* LIBEVENT_THREAD ******************************/

void conn_process(CQ_ITEM *item,  LIBEVENT_THREAD *me)
{
    int connfd,n;
    char    buff[4096];
    while(1)
    {
        connfd = item->sfd;
        {
            // 从客户端接收数据
            n = recv(connfd, buff, MAXLINE, 0);
            buff[n-1] = '\0';
            printf("工作线程ID＝%lu :从客户端接收到数据: %s\n",me->thread_id, buff);
            //close(connfd);

            // 向客户端发送数据
            char szText[44] = " TCP Server Demo! \r\n";
            sprintf(szText,"刚刚发送的内容为：[%s]，服务器线程ID＝%lu\n",buff,pthread_self());
            send(connfd, szText, strlen(szText), 0);

            //close(connfd);
        }

        close(connfd);
        break;
    }
}

/*
 * Processes an incoming "handle a new connection" item. This is called when
 * input arrives on the libevent wakeup pipe.
 */
static void thread_libevent_process(int fd, short which, void *arg)
{
    LIBEVENT_THREAD *me = (LIBEVENT_THREAD *)arg;
    CQ_ITEM *item;
    char buf[1];

    if (read(fd, buf, 1) != 1)
        if (settings.verbose > 0)
            fprintf(stderr, "Can't read from libevent pipe\n");

    switch (buf[0])
    {
    case 'c':
        item = cq_pop(me->new_conn_queue);

        if (NULL != item)
        {
            conn_process(item, me);
        }
        cqi_free(item);

        break;
    }
    /* we were told to flip the lock type and report in */


}

static void wait_for_thread_registration(int nthreads)
{
    while (init_count < nthreads)
    {
        pthread_cond_wait(&init_cond, &init_lock);
    }
}

static void register_thread_initialized(void)
{
    pthread_mutex_lock(&init_lock);
    init_count++;
    pthread_cond_signal(&init_cond);
    pthread_mutex_unlock(&init_lock);
}

/*
 * Worker thread: main event loop
 */
static void *worker_libevent(void *arg)
{
    LIBEVENT_THREAD *me = (LIBEVENT_THREAD *)arg;

    /* Any per-thread setup can happen here; thread_init() will block until
     * all threads have finished initializing.
     */

    /* set an indexable thread-specific memory item for the lock type.
     * this could be unnecessary if we pass the conn *c struct through
     * all item_lock calls...
     */
    //me->item_lock_type = ITEM_LOCK_GRANULAR;
    //pthread_setspecific(item_lock_type_key, &me->item_lock_type);

    register_thread_initialized();

    //event_base_loop(me->base, 0);
    while(1)
    {
        thread_libevent_process(me->notify_receive_fd, 0, me);
    }
    return NULL;
}

/*
 * Creates a worker thread.
 */
static void create_worker(void *(*func)(void *), void *arg)
{
    //pthread_t       thread;
    pthread_attr_t  attr;
    int             ret;

    pthread_attr_init(&attr);

    if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0)
    {
        perror("pthread_create");
        exit(1);
    }
}


/*
 * Set up a thread's information.
 */
static void setup_thread(LIBEVENT_THREAD *me)
{
    me->base = event_init();
    if (! me->base)
    {
        fprintf(stderr, "Can't allocate event base\n");
        exit(1);
    }

    me->new_conn_queue = (struct conn_queue *)malloc(sizeof(struct conn_queue));
    if (me->new_conn_queue == NULL)
    {
        perror("Failed to allocate memory for connection queue");
        exit(EXIT_FAILURE);
    }
    cq_init(me->new_conn_queue);

    if (pthread_mutex_init(&me->stats.mutex, NULL) != 0)
    {
        perror("Failed to initialize mutex");
        exit(EXIT_FAILURE);
    }

}

void methread_init(int nthreads, struct event_base *main_base)
{
    int         i;
//	int         power;


    pthread_mutex_init(&init_lock, NULL);
    pthread_cond_init(&init_cond, NULL);

    pthread_mutex_init(&cqi_freelist_lock, NULL);
    cqi_freelist = NULL;


    threads = (LIBEVENT_THREAD *)calloc(nthreads, sizeof(LIBEVENT_THREAD));
    if (! threads)
    {
        perror("Can't allocate thread descriptors");
        exit(1);
    }

    dispatcher_thread.base = main_base;
    dispatcher_thread.thread_id = pthread_self();

    for (i = 0; i < nthreads; i++)
    {
        int fds[2];
        if (pipe(fds))
        {
            perror("Can't create notify pipe");
            exit(1);
        }

        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];

        ////比如libevent句柄，连接队列等的初始化
        setup_thread(&threads[i]);
        /* Reserve three fds for the libevent base, and two for the pipe */
        //stats.reserved_fds += 5;
    }

    /* Create threads after we've done all the libevent setup. */
    for (i = 0; i < nthreads; i++)
    {
        create_worker(worker_libevent, &threads[i]);
    }

    /* Wait for all the threads to set themselves up before returning. */
    pthread_mutex_lock(&init_lock);
    wait_for_thread_registration(nthreads);
    pthread_mutex_unlock(&init_lock);
}

/******************************* LIBEVENT_DISPATCHER_THREAD ******************************/

/*
 * Dispatches a new connection to another thread. This is only ever called
 * from the main thread, either during initialization (for UDP) or because
 * of an incoming connection.
 */
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport)
{
    CQ_ITEM *item = cqi_new();
    char buf[1];
    int tid = (last_thread + 1) % settings.num_threads;

    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;

    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;

    cq_push(thread->new_conn_queue, item);

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = 'c';
    if (write(thread->notify_send_fd, buf, 1) != 1)
    {
        perror("Writing to thread notify pipe");
    }
}

/*
 * Wait continously for events.  We exit only if no events are left.
 */
void event_dispatch_loop(int listenfd)
{
    int connfd;

    printf("======等待客户端的请求======\n");
    while(1)
    {
        if( (connfd = accept(listenfd, (struct sockaddr*)NULL, NULL)) == -1)
        {
            perror("accept");
            continue;
        }

        dispatch_conn_new(connfd, conn_new_cmd, EV_READ | EV_PERSIST,
                          DATA_BUFFER_SIZE, tcp_transport);

        //close(connfd);
    }
}

int listening_socket()
{
    //int     connfd;
    struct sockaddr_in     servaddr;
    //char    buff[4096];
    //int     n;

    int listenfd;
    if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1 )
    {
        perror("socket");
        exit(0);
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(6666);

    int optval = 1;
    if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(int)) == -1)
    {
        perror("setsockopt");
        exit(0);
    }

    if( bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1)
    {
        perror("bind");
        exit(0);
    }

    if( listen(listenfd, 10) == -1)
    {
        perror("listen");
        exit(0);
    }
    return listenfd;
}

/******************************* MAIN ******************************/

int main()
{
    int    listenfd;
    settings.num_threads=3;
    /* initialize main thread libevent instance */
    main_base = event_init();

    /* start up worker threads if MT mode */
    methread_init(settings.num_threads, main_base);

    /* create the listening socket, bind it, and init */
    listenfd = listening_socket();
    printf("主线程ID＝%lu\n",pthread_self());

    /* Give the sockets a moment to open. I know this is dumb, but the error
     * is only an advisory.
     */
    usleep(1000);

    /* enter the event loop */
    event_dispatch_loop(listenfd);

    close(listenfd);
    return 0;
}




